From c6bfe8a8909091b6c0e5202745d9037715158046 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Thu, 10 Sep 2020 14:45:20 +0200 Subject: [PATCH] Work on auto sharding --- Debug/makefile | 1 + Debug/sources.mk | 1 + Debug/src/omap/subdir.mk | 20 +++++ Release/makefile | 1 + Release/sources.mk | 1 + Release/src/omap/subdir.mk | 20 +++++ include/omap/omap.h | 62 +++++++++++++ include/siri/db/db.h | 2 +- include/siri/db/points.h | 1 + include/siri/db/query.h | 2 +- include/siri/db/series.h | 2 + include/siri/db/shard.h | 15 ++-- include/siri/db/shards.h | 2 + src/omap/omap.c | 151 ++++++++++++++++++++++++++++++++ src/siri/db/db.c | 3 +- src/siri/db/points.c | 42 +++++++++ src/siri/db/series.c | 5 +- src/siri/db/shard.c | 71 ++++++++++++--- src/siri/db/shards.c | 155 +++++++++++++++++++++++---------- test/test_siridb/sources | 1 + test/test_siridb/test_siridb.c | 40 +++++++++ 21 files changed, 531 insertions(+), 67 deletions(-) create mode 100644 Debug/src/omap/subdir.mk create mode 100644 Release/src/omap/subdir.mk create mode 100644 include/omap/omap.h create mode 100644 src/omap/omap.c diff --git a/Debug/makefile b/Debug/makefile index c43b8139..0a89e84e 100644 --- a/Debug/makefile +++ b/Debug/makefile @@ -31,6 +31,7 @@ RM := rm -rf -include src/iso8601/subdir.mk -include src/lib/subdir.mk -include src/imap/subdir.mk +-include src/omap/subdir.mk -include src/expr/subdir.mk -include src/ctree/subdir.mk -include src/cfgparser/subdir.mk diff --git a/Debug/sources.mk b/Debug/sources.mk index 35930d3f..02a3425b 100644 --- a/Debug/sources.mk +++ b/Debug/sources.mk @@ -22,6 +22,7 @@ src/lib \ src/llist \ src/lock \ src/logger \ +src/omap \ src/owcrypt \ src/procinfo \ src/qpack \ diff --git a/Debug/src/omap/subdir.mk b/Debug/src/omap/subdir.mk new file mode 100644 index 00000000..8a60ff02 --- /dev/null +++ b/Debug/src/omap/subdir.mk @@ -0,0 +1,20 @@ +# Add inputs and outputs from these tool invocations to the build variables +C_SRCS += \ +../src/omap/omap.c + +OBJS += \ +./src/omap/omap.o + +C_DEPS += \ +./src/omap/omap.d + + +# Each subdirectory must supply rules for building sources it contributes +src/omap/%.o: ../src/omap/%.c + @echo 'Building file: $<' + @echo 'Invoking: GCC C Compiler' + gcc -I../include -O0 -g3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" + @echo 'Finished building: $<' + @echo ' ' + + diff --git a/Release/makefile b/Release/makefile index cb978b7b..497e4198 100644 --- a/Release/makefile +++ b/Release/makefile @@ -31,6 +31,7 @@ RM := rm -rf -include src/iso8601/subdir.mk -include src/lib/subdir.mk -include src/imap/subdir.mk +-include src/omap/subdir.mk -include src/expr/subdir.mk -include src/ctree/subdir.mk -include src/cfgparser/subdir.mk diff --git a/Release/sources.mk b/Release/sources.mk index 35930d3f..02a3425b 100644 --- a/Release/sources.mk +++ b/Release/sources.mk @@ -22,6 +22,7 @@ src/lib \ src/llist \ src/lock \ src/logger \ +src/omap \ src/owcrypt \ src/procinfo \ src/qpack \ diff --git a/Release/src/omap/subdir.mk b/Release/src/omap/subdir.mk new file mode 100644 index 00000000..cd3db935 --- /dev/null +++ b/Release/src/omap/subdir.mk @@ -0,0 +1,20 @@ +# Add inputs and outputs from these tool invocations to the build variables +C_SRCS += \ +../src/omap/omap.c + +OBJS += \ +./src/omap/omap.o + +C_DEPS += \ +./src/omap/omap.d + + +# Each subdirectory must supply rules for building sources it contributes +src/omap/%.o: ../src/omap/%.c + @echo 'Building file: $<' + @echo 'Invoking: GCC C Compiler' + gcc -DNDEBUG -I../include -O3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" + @echo 'Finished building: $<' + @echo ' ' + + diff --git a/include/omap/omap.h b/include/omap/omap.h new file mode 100644 index 00000000..7b5d5fd9 --- /dev/null +++ b/include/omap/omap.h @@ -0,0 +1,62 @@ +/* + * util/omap.h + */ +#ifndef OMAP_H_ +#define OMAP_H_ + +enum +{ + OMAP_ERR_EXIST =-2, + OMAP_ERR_ALLOC =-1, + OMAP_SUCCESS =0 +}; + +typedef struct omap_s omap_t; +typedef struct omap__s omap__t; +typedef struct omap__s * omap_iter_t; + +#include + +typedef void (*omap_destroy_cb)(void * data); + +/* private */ +struct omap__s +{ + omap__t * next_; + uint64_t id_; + void * data_; +}; + +omap_t * omap_create(void); +void omap_destroy(omap_t * omap, omap_destroy_cb cb); +int omap_add(omap_t * omap, uint64_t id, void * data); +void * omap_set(omap_t * omap, uint64_t id, void * data); +void * omap_get(omap_t * omap, uint64_t id); +uint64_t * omap_last_id(omap_t * omap); +void * omap_rm(omap_t * omap, uint64_t id); +static inline omap_iter_t omap_iter(omap_t * omap); +static inline uint64_t omap_iter_id(omap_iter_t iter); +#define omap_each(iter__, dt__, var__) \ + dt__ * var__; \ + iter__ && \ + (var__ = (dt__ *) iter__->data_); \ + iter__ = iter__->next_ + +struct omap_s +{ + omap__t * next_; + size_t n; +}; + +static inline omap_iter_t omap_iter(omap_t * omap) +{ + return omap->next_; +} + +static inline uint64_t omap_iter_id(omap_iter_t iter) +{ + return iter->id_; +} + + +#endif /* OMAP_H_ */ diff --git a/include/siri/db/db.h b/include/siri/db/db.h index bfdea944..be216938 100644 --- a/include/siri/db/db.h +++ b/include/siri/db/db.h @@ -96,7 +96,7 @@ struct siridb_s uv_mutex_t series_mutex; uv_mutex_t shards_mutex; uv_mutex_t values_mutex; - imap_t * shards; + imap_t * shards; /* contains lists with shards */ FILE * dropped_fp; qp_fpacker_t * store; siridb_fifo_t * fifo; diff --git a/include/siri/db/points.h b/include/siri/db/points.h index 19172635..bee1fdd3 100644 --- a/include/siri/db/points.h +++ b/include/siri/db/points.h @@ -86,6 +86,7 @@ int siridb_points_unzip_string_raw( uint8_t * bits, uint16_t len); size_t siridb_points_get_size_zipped(uint16_t cinfo, uint16_t len); +uint64_t siridb_points_get_interval(siridb_points_t * points); #define siridb_points_zip(p__, s__, e__, c__, z__) \ ((p__)->tp == TP_INT) ? \ diff --git a/include/siri/db/query.h b/include/siri/db/query.h index fbe2ef76..d522b1a0 100644 --- a/include/siri/db/query.h +++ b/include/siri/db/query.h @@ -82,7 +82,7 @@ struct siridb_query_s qp_packer_t * timeit; cleri_parse_t * pr; siridb_nodes_t * nodes; - struct timespec start; + struct timespec start;SIRIDB_IS64BIT }; #endif /* SIRIDB_QUERY_H_ */ diff --git a/include/siri/db/series.h b/include/siri/db/series.h index 96f2d8f9..d3aa23c9 100644 --- a/include/siri/db/series.h +++ b/include/siri/db/series.h @@ -65,11 +65,13 @@ struct siridb_series_s uint32_t length; uint32_t idx_len; long int bf_offset; + uint64_t interval; siridb_points_t * buffer; char * name; idx_t * idx; siridb_t * siridb; }; + #include int siridb_series_load(siridb_t * siridb); diff --git a/include/siri/db/shard.h b/include/siri/db/shard.h index 85bacfa5..4d6813ab 100644 --- a/include/siri/db/shard.h +++ b/include/siri/db/shard.h @@ -37,18 +37,22 @@ typedef struct siridb_shard_view_s siridb_shard_view_t; #include #include #include +#include siridb_shard_t * siridb_shard_create( siridb_t * siridb, + omap_t * shards, uint64_t id, uint64_t duration, uint8_t tp, siridb_shard_t * replacing); +uint64_t siridb_shard_duration_from_interval(siridb_t * siridb, uint64_t interval); +uint64_t siridb_shard_interval_from_duration(uint64_t duration); int siridb_shard_cexpr_cb( siridb_shard_view_t * vshard, cexpr_condition_t * cond); int siridb_shard_status(char * str, siridb_shard_t * shard); -int siridb_shard_load(siridb_t * siridb, uint64_t id); +int siridb_shard_load(siridb_t * siridb, uint64_t id, uint64_t duration); void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb); size_t siridb_shard_write_points( siridb_t * siridb, @@ -113,13 +117,14 @@ struct siridb_shard_flags_repr_s struct siridb_shard_s { - uint32_t ref; /* keep ref on top */ - uint8_t tp; /* TP_NUMBER, TP_LOG */ + uint32_t ref; /* keep ref on top */ + uint8_t tp; /* TP_NUMBER, TP_LOG */ uint8_t flags; uint16_t max_chunk_sz; uint64_t id; - size_t len; - size_t size; + size_t len; /* size of the shard which is used */ + size_t size; /* size of shard on disk */ + size_t duration; /* based on the interval of series */ siri_fp_t * fp; char * fn; siridb_shard_t * replacing; diff --git a/include/siri/db/shards.h b/include/siri/db/shards.h index 773bdbfb..ed7c64c3 100644 --- a/include/siri/db/shards.h +++ b/include/siri/db/shards.h @@ -18,7 +18,9 @@ #define SIRIDB_SHARDS_PATH "shards/" #include +#include +void siridb_shards_destroy_cb(omap_t * shards); int siridb_shards_load(siridb_t * siridb); int siridb_shards_add_points( siridb_t * siridb, diff --git a/src/omap/omap.c b/src/omap/omap.c new file mode 100644 index 00000000..999442f3 --- /dev/null +++ b/src/omap/omap.c @@ -0,0 +1,151 @@ +/* + * util/omap.h + */ +#include +#include +#include + +static void * omap__rm(omap_t * omap, omap__t ** omap_); +static omap__t * omap__new(uint64_t id, void * data, omap__t * next); + +omap_t * omap_create(void) +{ + omap_t * omap = malloc(sizeof(omap_t)); + if (!omap) + { + return NULL; + } + + omap->next_ = NULL; + omap->n = 0; + + return omap; +} + +void omap_destroy(omap_t * omap, omap_destroy_cb cb) +{ + if (!omap) + { + return; + } + omap__t * cur = (omap__t *) omap; + omap__t * tmp; + + for (; (tmp = cur->next_); cur = tmp) + { + if (cb && tmp) + { + (*cb)(tmp->data_); + } + free(cur); + } + free(cur); +} + +/* + * In case of a duplicate id the return value is OMAP_ERR_EXIST and data + * will NOT be overwritten. On success the return value is OMAP_SUCCESS and + * if a memory error has occurred the return value is OMAP_ERR_ALLOC. + */ +int omap_add(omap_t * omap, uint64_t id, void * data) +{ + assert (omap); + assert (data); + omap__t * cur, * tmp; + + for ( cur = (omap__t *) omap; + cur->next_ && cur->next_->id_ < id; + cur = cur->next_); + + if (cur->next_ && cur->next_->id_ == id) + { + return OMAP_ERR_EXIST; + } + + tmp = omap__new(id, data, cur->next_); + if (!tmp) + { + return OMAP_ERR_ALLOC; + } + + omap->n++; + cur->next_ = tmp; + + return OMAP_SUCCESS; +} + +/* + * In case of a duplicate id the return value is the previous value and data + * will be overwritten. On success the return value is equal to void*data and + * if a memory error has occurred the return value is NULL. + */ +void * omap_set(omap_t * omap, uint64_t id, void * data) +{ + assert (omap); + assert (data); + omap__t * cur, * tmp; + + for ( cur = (omap__t *) omap; + cur->next_ && cur->next_->id_ < id; + cur = cur->next_); + + if (cur->next_ && cur->next_->id_ == id) + { + void * prev = cur->next_->data_; + cur->next_->data_ = data; + return prev; + } + + tmp = omap__new(id, data, cur->next_); + if (!tmp) + return NULL; + + omap->n++; + cur->next_ = tmp; + + return data; +} + +void * omap_get(omap_t * omap, uint64_t id) +{ + omap__t * cur = (omap__t *) omap; + while ((cur = cur->next_) && cur->id_ < id); + + return cur && cur->id_ == id ? cur->data_ : NULL; +} + +void * omap_rm(omap_t * omap, uint64_t id) +{ + omap__t * cur, * prev = (omap__t *) omap; + while ((cur = prev->next_) && cur->id_ < id) + { + prev = cur; + } + + return cur && cur->id_ == id ? omap__rm(omap, &prev->next_) : NULL; +} + +static void * omap__rm(omap_t * omap, omap__t ** omap_) +{ + omap__t * cur = *omap_; + void * data = cur->data_; + *omap_ = cur->next_; + + free(cur); + --omap->n; + + return data; +} + +static omap__t * omap__new(uint64_t id, void * data, omap__t * next) +{ + omap__t * omap = malloc(sizeof(omap__t)); + if (!omap) + return NULL; + + omap->id_ = id; + omap->data_ = data; + omap->next_ = next; + + return omap; +} diff --git a/src/siri/db/db.c b/src/siri/db/db.c index 24256b46..152cd12d 100644 --- a/src/siri/db/db.c +++ b/src/siri/db/db.c @@ -684,6 +684,7 @@ int siridb_save(siridb_t * siridb) qp_close(fpacker)); } + /* * Destroy SiriDB object. * @@ -764,7 +765,7 @@ void siridb__free(siridb_t * siridb) /* free shards using imap walk an free the imap */ if (siridb->shards != NULL) { - imap_free(siridb->shards, (imap_free_cb) &siridb__shard_decref); + imap_free(siridb->shards, (imap_free_cb) &siridb_shards_destroy_cb); } if (siridb->groups != NULL) diff --git a/src/siri/db/points.c b/src/siri/db/points.c index 18788724..e672a776 100644 --- a/src/siri/db/points.c +++ b/src/siri/db/points.c @@ -15,6 +15,7 @@ #define POINTS_MAX_QSORT 250000 #define RAW_VALUES_THRESHOLD 7 #define DICT_SZ 0x3fff +#define TOLERANCE_INTERVAL_DETECT 8 static unsigned char * POINTS_zip_raw( siridb_points_t * points, @@ -1686,6 +1687,47 @@ static int POINTS_set_cinfo_size(uint16_t * cinfo, size_t * size) return 0; } +uint64_t siridb_points_get_interval(siridb_points_t * points) +{ + size_t i, j, n; + uint64_t * arr; + uint64_t x, a, b, c; + + n = points->len - 1; + n = n > 63 ? 63 : n; + if (n < 7) + { + return 0; + } + + arr = malloc(n * sizeof(uint64_t)); + if (arr == NULL) + { + return 0; + } + + for (i = 0; i < n; ++i) + { + x = points->data[i+1].ts - points->data[i].ts; + for (j = i; j > 0 && arr[j-1] > x; --j) + { + arr[j] = arr[j-1]; + } + arr[j] = x; + } + + a = n/4; + b = n/2; + c = arr[(b<<1)-a]; + a = arr[a]; + b = arr[b]; + x = b / (100 / TOLERANCE_INTERVAL_DETECT); + x = (a+x < b || c-x > b) ? 0 : b; + + free(arr); + return x; +} + inline static uint16_t POINTS_hash(uint32_t h) { return ((h >> 17) ^ (h & 0xffff)) & DICT_SZ; diff --git a/src/siri/db/series.c b/src/siri/db/series.c index 1510b237..c7c6b937 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -1039,9 +1039,7 @@ int siridb_series_optimize_shard( siridb_points_t *__restrict points; int rc; uint16_t cinfo = 0; - uint64_t duration = (shard->tp == SIRIDB_SHARD_TP_NUMBER) ? - siridb->duration_num : siridb->duration_log; - max_ts = (shard->id + duration) - series->mask; + max_ts = (shard->id + shard->duration) - series->mask; rc = new_idx = end = i = size = start = 0; @@ -1355,6 +1353,7 @@ static siridb_series_t * SERIES_new( series->idx_len = 0; series->idx = NULL; series->siridb = siridb; + series->interval = 0; /* get sum series name to calculate series mask (for sharding) */ for (n = 0; *name; name++) diff --git a/src/siri/db/shard.c b/src/siri/db/shard.c index 1cfa1878..4b5e1c6d 100644 --- a/src/siri/db/shard.c +++ b/src/siri/db/shard.c @@ -32,6 +32,9 @@ /* shard schema (schemas below 20 are reserved for Python SiriDB) */ #define SIRIDB_SHARD_SHEMA 21 +/* optimal points in a single shard */ +#define OPTIMAL_POINTS_PER_SHARD 2000 + /* * Header schema layout * @@ -130,11 +133,51 @@ static size_t SHARD_write_header( FILE * fp); static int SHARD_remove(siridb_shard_t * shard); +uint64_t siridb_shard_duration_from_interval(siridb_t * siridb, uint64_t interval) +{ + uint64_t x, n, week, day, hour; + + n = interval * OPTIMAL_POINTS_PER_SHARD; + + if (n == siridb->duration_num) + { + return siridb->duration_num; + } + + if (n == siridb->duration_log) + { + return siridb->duration_log; + } + + week = 3600*24*7*siridb->time->factor; + x = n / week; + if (x) + { + return (x + 1) * week; + } + + day = 3600*24*siridb->time->factor; + x = n / day; + if (x) + { + return (x + 1) * day; + } + + hour = 3600*siridb->time->factor; + x = n / hour; + return (x + 1) * hour; +} + +uint64_t siridb_shard_interval_from_duration(uint64_t duration) +{ + return duration / OPTIMAL_POINTS_PER_SHARD;; +} + /* * Returns 0 if successful or -1 in case of an error. * When an error occurs, a SIGNAL can be raised in some cases but not for sure. */ -int siridb_shard_load(siridb_t * siridb, uint64_t id) +int siridb_shard_load(siridb_t * siridb, uint64_t id, uint64_t duration) { int is_ts64; FILE * fp; @@ -152,10 +195,13 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id) free(shard); return -1; /* signal is raised */ } + shard->id = id; shard->ref = 1; shard->len = HEADER_SIZE; shard->replacing = NULL; + shard->duration = duration; + if (SHARD_init_fn(siridb, shard) < 0) { ERR_ALLOC @@ -163,7 +209,6 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id) return -1; /* signal is raised */ } - log_info("Loading shard %" PRIu64, id); if ((fp = fopen(shard->fn, "r")) == NULL) @@ -290,12 +335,15 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id) */ siridb_shard_t * siridb_shard_create( siridb_t * siridb, + omap_t * shards, uint64_t id, uint64_t duration, uint8_t tp, siridb_shard_t * replacing) { siridb_shard_t * shard = malloc(sizeof(siridb_shard_t)); + FILE * fp; + if (shard == NULL) { ERR_ALLOC @@ -311,12 +359,12 @@ siridb_shard_t * siridb_shard_create( shard->tp = tp; shard->replacing = replacing; shard->len = shard->size = HEADER_SIZE; + shard->duration = duration; shard->max_chunk_sz = (replacing == NULL) ? (tp == SIRIDB_SHARD_TP_NUMBER ? DEFAULT_MAX_CHUNK_SZ_NUM : DEFAULT_MAX_CHUNK_SZ_LOG) : replacing->max_chunk_sz; - FILE * fp; if (SHARD_init_fn(siridb, shard) < 0) { siridb_shard_decref(shard); @@ -376,7 +424,7 @@ siridb_shard_t * siridb_shard_create( return NULL; } - if (imap_set(siridb->shards, id, shard) == -1) + if (omap_set(shards, duration, shard) == NULL) { siridb_shard_decref(shard); ERR_ALLOC @@ -579,6 +627,7 @@ size_t siridb_shard_write_points( { size_t p = 0; size_t ts_sz = siridb->time->ts_sz; + cdata = malloc(dsize); if (cdata == NULL) { @@ -1201,8 +1250,6 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb) { int rc = 0; siridb_shard_t * new_shard = NULL; - uint64_t duration = (shard->tp == SIRIDB_SHARD_TP_NUMBER) ? - siridb->duration_num : siridb->duration_log; siridb_series_t * series; size_t i; @@ -1214,10 +1261,14 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb) */ if (~shard->flags & SIRIDB_SHARD_IS_REMOVED) { + omap_t * shards = imap_get(siridb->shards, shard->id); + assert (shards); + if ((new_shard = siridb_shard_create( siridb, + shards, shard->id, - duration, + shard->duration, shard->tp, shard)) == NULL) { @@ -1289,7 +1340,7 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb) if ( !siri_err && siri.optimize->status != SIRI_OPTIMIZE_CANCELLED && - shard->id % duration == series->mask && + shard->id % shard->duration == series->mask && (~series->flags & SIRIDB_SERIES_IS_DROPPED) && (~new_shard->flags & SIRIDB_SHARD_IS_REMOVED)) { @@ -1874,12 +1925,12 @@ static inline int SHARD_init_fn(siridb_t * siridb, siridb_shard_t * shard) { return asprintf( &shard->fn, - "%s%s%s%" PRIu64 "%s", + "%s%s%s%016"PRIX64"_%016"PRIX64".sdb", siridb->dbpath, SIRIDB_SHARDS_PATH, (shard->replacing == NULL) ? "" : "__", shard->id, - ".sdb"); + shard->duration); } /* diff --git a/src/siri/db/shards.c b/src/siri/db/shards.c index 5eb1c95a..b62a89f6 100644 --- a/src/siri/db/shards.c +++ b/src/siri/db/shards.c @@ -25,11 +25,68 @@ #include #include #include +#include -#define SIRIDB_MAX_SHARD_FN_LEN 23 +#define SIRIDB_SHARD_LEN 37 + + +static bool SHARDS_read_id_and_duration( + char * fn, + const char * ext, + uint64_t * shard_id, + uint64_t * duration) +{ + size_t n = strlen(fn); + char * tmp = NULL; + + if (n != SIRIDB_SHARD_LEN) + { + return false; + } + + *shard_id = strtoull(fn, &tmp, 16); + if (tmp == NULL) + { + return false; + } + fn = tmp; + + if (*fn != '_') + { + return false; + } + + *duration = strtoull(fn, &tmp, 16); + if (tmp == NULL) + { + return false; + } + fn = tmp; + + return strcmp(fn, ext) == 0; +} + +/* + * Returns true if fn is a temp shard or index filename, false if not. + */ +static bool SHARDS_is_temp_fn(char * fn) +{ + int i; + uint64_t shard_id, duration; + for (i = 0; i < 2; i++, fn++) + { + if (*fn != '_') + { + return false; + } + } + + return ( + SHARDS_read_id_and_duration(fn, ".sdb", &shard_id, &duration) || + SHARDS_read_id_and_duration(fn, ".idx", &shard_id, &duration) + ); +} -static bool is_shard_fn(const char * fn, const char * ext); -static bool is_temp_fn(const char * fn); /* * Returns 0 if successful or -1 in case of an error. @@ -41,6 +98,7 @@ int siridb_shards_load(siridb_t * siridb) struct dirent ** shard_list; char buffer[XPATH_MAX]; int n, total, rc = 0; + uint64_t shard_id, duration; memset(&st, 0, sizeof(struct stat)); @@ -48,7 +106,7 @@ int siridb_shards_load(siridb_t * siridb) siridb_misc_get_fn(path, siridb->dbpath, SIRIDB_SHARDS_PATH); - if (strlen(path) >= XPATH_MAX - SIRIDB_MAX_SHARD_FN_LEN - 1) + if (strlen(path) >= XPATH_MAX - SIRIDB_SHARD_LEN - 1) { log_error("Shard path too long: '%s'", path); return -1; @@ -77,7 +135,7 @@ int siridb_shards_load(siridb_t * siridb) for (n = 0; n < total; n++) { - if (is_temp_fn(shard_list[n]->d_name)) + if (SHARDS_is_temp_fn(shard_list[n]->d_name)) { snprintf(buffer, XPATH_MAX, "%s%s", path, shard_list[n]->d_name); @@ -92,13 +150,17 @@ int siridb_shards_load(siridb_t * siridb) } } - if (!is_shard_fn(shard_list[n]->d_name, ".sdb")) + if (!SHARDS_read_id_and_duration( + shard_list[n]->d_name, + ".sdb", + &shard_id, + &duration)) { continue; } /* we are sure this fits since the filename is checked */ - if (siridb_shard_load(siridb, (uint64_t) atoll(shard_list[n]->d_name))) + if (siridb_shard_load(siridb, shard_id, duration)) { log_error("Error while loading shard: '%s'", shard_list[n]->d_name); rc = -1; @@ -115,6 +177,11 @@ int siridb_shards_load(siridb_t * siridb) return rc; } +void siridb_shards_destroy_cb(omap_t * shards) +{ + omap_destroy(shards, (omap_destroy_cb) &siridb__shard_decref); +} + /* * Returns siri_err which is 0 if successful or a negative integer in case * of an error. (a SIGNAL is also raised in case of an error) @@ -126,6 +193,7 @@ int siridb_shards_add_points( { _Bool is_num = siridb_series_isnum(series); siridb_shard_t * shard; + omap_t * shards; uv_mutex_lock(&siridb->values_mutex); @@ -134,6 +202,19 @@ int siridb_shards_add_points( uv_mutex_unlock(&siridb->values_mutex); + if (series->interval == 0) + { + series->interval = siridb_points_get_interval(points); + + if (series->interval == 0) + { + /* fall-back to default interval */ + series->interval = siridb_shard_interval_from_duration(duration); + } + } + + duration = siridb_shard_duration_from_interval(siridb, series->interval); + uint64_t shard_start, shard_end, shard_id; uint_fast32_t start, end, num_chunks, pstart, pend; uint16_t chunk_sz; @@ -157,10 +238,28 @@ int siridb_shards_add_points( continue; } - if ((shard = imap_get(siridb->shards, shard_id)) == NULL) + shard = NULL; + shards = imap_get(siridb->shards, shard_id); + if (shards != NULL) + { + shard = omap_get(shards, duration); + /* shard may be NULL if no shard according the duration is found */ + } + else + { + shards = omap_create(); + if (shards == NULL || imap_add(siridb->shards, shard_id, shards)) + { + ERR_ALLOC + return -1; /* might leak a few bytes */ + } + } + + if (shard == NULL) { shard = siridb_shard_create( siridb, + shards, shard_id, duration, is_num ? SIRIDB_SHARD_TP_NUMBER : SIRIDB_SHARD_TP_LOG, @@ -197,8 +296,8 @@ int siridb_shards_add_points( &cinfo)) == 0) { log_critical( - "Could not write points to shard id %" PRIu64, - shard->id); + "Could not write points to shard '%s'", + shard->fn); } else { @@ -274,39 +373,3 @@ double siridb_shards_count_percent( vec_free(shards_list); return percent; } - -/* - * Returns true if fn is a shard filename, false if not. - * Argument ext should be either ".sdb" or ".idx". - */ -static bool is_shard_fn(const char * fn, const char * ext) -{ - if (!isdigit(*fn) || strlen(fn) > SIRIDB_MAX_SHARD_FN_LEN) - { - return false; - } - - fn++; - while (*fn && isdigit(*fn)) - { - fn++; - } - - return (strcmp(fn, ext) == 0); -} - -/* - * Returns true if fn is a temp shard or index filename, false if not. - */ -static bool is_temp_fn(const char * fn) -{ - int i; - for (i = 0; i < 2; i++, fn++) - { - if (*fn != '_') - { - return false; - } - } - return is_shard_fn(fn, ".sdb") || is_shard_fn(fn, ".idx"); -} diff --git a/test/test_siridb/sources b/test/test_siridb/sources index f638f653..7f4427a4 100644 --- a/test/test_siridb/sources +++ b/test/test_siridb/sources @@ -6,6 +6,7 @@ ../src/qpack/qpack.c ../src/qpjson/qpjson.c ../src/imap/imap.c +../src/omap/omap.c ../src/llist/llist.c ../src/logger/logger.c ../src/xstr/xstr.c diff --git a/test/test_siridb/test_siridb.c b/test/test_siridb/test_siridb.c index 40771c77..d1161f40 100644 --- a/test/test_siridb/test_siridb.c +++ b/test/test_siridb/test_siridb.c @@ -1,6 +1,7 @@ #include "../test.h" #include #include +#include static int test_series_ensure_type(void) @@ -107,6 +108,45 @@ static int test_series_ensure_type(void) _assert (strlen("-1") == qp_obj.len); _assert (strncmp("-1", qp_obj.via.str, qp_obj.len) == 0); } + + /* test interval */ + { + uint64_t interval, duration, test, w, d, h; + siridb_t siridb; + siridb.duration_num = 1000 * 3600 * 24 * 8; + siridb.duration_log = 1000 * 3600 * 41; + siridb.time = malloc(sizeof(siridb_time_t)); + siridb.time->factor = 1000; + + for (w = 1; w < 8; ++w) + { + duration = 3600 * 24 * 7 * w * siridb.time->factor; + interval = siridb_shard_interval_from_duration(&siridb, duration); + test = siridb_shard_duration_from_interval(&siridb, interval); + printf("%lu: %lu (%lu) %lu\n", i, duration, interval, test); + _assert (duration == test); + } + + for (d = 1; d < 8; ++d) + { + duration = 3600 * 24 * d * siridb.time->factor; + interval = siridb_shard_interval_from_duration(&siridb, duration); + test = siridb_shard_duration_from_interval(&siridb, interval); + printf("%lu: %lu (%lu) %lu\n", i, duration, interval, test); + _assert (duration == test); + } + + for (h = 1; h < 25; ++d) + { + duration = 3600 * h * siridb.time->factor; + interval = siridb_shard_interval_from_duration(&siridb, duration); + test = siridb_shard_duration_from_interval(&siridb, interval); + printf("%lu: %lu (%lu) %lu\n", i, duration, interval, test); + _assert (duration == test); + } + + free(siridb.time); + } (void) setlocale(LC_ALL, NULL); return test_end(); }; -- 2.30.2